package connection

import (
	"fmt"
	"github.com/IBM/sarama"
	"github.com/spf13/viper"
	"log"
	"time"
)

// kafka 消息接收服务
type PubKafkaConsumer struct {
	Client *sarama.Client
}

func (this *PubKafkaConsumer) NewKafkaConsumer() *PubKafkaConsumer {
	kafk := PubKafkaConsumer{}
	err := kafk.NewKafka()
	if err != nil {
		return nil
	}
	return &kafk
}
func (this *PubKafkaConsumer) NewKafka() error {
	//defer func() {
	//	if err := recover(); err != nil {
	//		// 失败1秒后重新链
	//		time.Sleep(1 * time.Second)
	//		log.Println(err, "kafka 订阅，失败重连") // 这里的err其实就是panic传入的内容，55
	//		err := this.NewKafka()
	//		if err != nil {
	//			return
	//		}
	//	}
	//}()
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.AutoCommit.Enable = false
	//sarama.Logger = log{}
	//cfg := sarama.NewConfig()
	//cfg.Version = sarama.V2_2_0_0
	//cfg.Producer.Return.Errors = true
	//cfg.Net.SASL.Enable = false
	//cfg.Producer.Return.Successes = true //这个是关键，否则读取不到消息
	//cfg.Producer.RequiredAcks = sarama.WaitForAll
	//cfg.Producer.Partitioner = sarama.NewManualPartitioner //允许指定分组
	//cfg.Consumer.Return.Errors = true
	//cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
	////cfg.Group.Return.Notifications = true
	//cfg.ClientID = "service-exchange-api"
	config.Producer.RequiredAcks = sarama.WaitForAll        // 赋值为-1：这意味着producer在follower副本确认接收到数据后才算一次发送完成。
	config.Producer.Return.Successes = true                 //这个是关键，否则读取不到消息
	config.Producer.Partitioner = sarama.NewHashPartitioner // 对Key进行Hash，同样的Key每次都落到一个分区，这样消息是有序的
	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
	//config.Producer.Partitioner =sarama.NewRandomPartitioner //写到随机分区中，默认设置8个分区
	client, err := sarama.NewClient(viper.GetStringSlice("subscribe.kafka.brokerIpPort"), config)
	//return sarama.NewClient(viper.GetStringSlice("kafka.brokerIpPort"), config)
	if err != nil {
		// 失败1秒后重新链
		time.Sleep(1 * time.Second)
		log.Println(err, "kafka 订阅，失败重连") // 这里的err其实就是panic传入的内容，55
		return this.NewKafka()
		//panic(err)
	}
	this.Client = &client
	return err
}

type FuncType func(*sarama.PartitionConsumer, *sarama.ConsumerMessage)

func (this *PubKafkaConsumer) Subscribe(topic string, callbake FuncType) {
	//断开后重连
	defer func() {
		if err := recover(); err != nil {
			// 失败1秒后重新链
			time.Sleep(1 * time.Second)
			log.Println("PubKafkaConsumer Subscribe出现异常1秒后重连\n", err)
			this = this.NewKafkaConsumer()
			this.Subscribe(topic, callbake)
		}
	}()

	consumer, err := sarama.NewConsumerFromClient(*this.Client)

	defer func(consumer sarama.Consumer) {
		err := consumer.Close()
		if err != nil {

		}
	}(consumer)

	if err != nil {
		panic(err)
	}
	// get partitionId list
	partitions, err := consumer.Partitions(topic)
	if err != nil {
		panic(err)
	}

	for _, partitionId := range partitions {
		// create partitionConsumer for every partitionId
		partitionConsumer, err := consumer.ConsumePartition(topic, partitionId, sarama.OffsetNewest)
		if err != nil {
			panic(err)
		}

		go func(pc *sarama.PartitionConsumer) {
			defer func(consumer sarama.PartitionConsumer) {
				err := consumer.Close()
				if err != nil {

				}
			}((*pc))
			// block
			for message := range (*pc).Messages() {
				//ch <- message.Value
				callbake(pc, message)
				//value := string(message.Value)
				//fmt.Println("PartitionConsumer:", value)
				//log.Printf("Partitionid: %d; offset:%d, value: %s\n", message.Partition, message.Offset, value)
			}

		}(&partitionConsumer)
	}
}
func (this *PubKafkaConsumer) Push(topic string, key string, value []byte) error {
	//log.Println("发送订阅【", idx, "】———————————————————!", time.Now())
	message := &sarama.ProducerMessage{
		Topic: topic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.ByteEncoder(value),
	}
	c, err := sarama.NewSyncProducerFromClient(*this.Client)
	if err != nil {
		return err
	}
	defer func(c sarama.SyncProducer) {
		err := c.Close()
		if err != nil {

		}
	}(c)
	//partition, offset
	_, _, err2 := c.SendMessage(message)
	if err2 != nil {
		return err2
	}
	return nil
}

// 异步推送
func (this *PubKafkaConsumer) AsyncProducer(client sarama.Client, message *sarama.ProducerMessage) error {

	//此为异步
	c, err := sarama.NewAsyncProducerFromClient(client)
	////sarama.NewSyncProducerFromClient() 此为同步
	if err != nil {
		return err
	}
	defer func(c sarama.AsyncProducer) {
		err := c.Close()
		if err != nil {

		}
	}(c)

	//Topic 为主题，Partition为区域 Partition如果不给默认为0 记得设置cfg.Producer.Partitioner = sarama.NewManualPartitioner 这里为允许设置指定的分区
	//分区是从0开始，记得在启动配置文件时修改Partition的分区
	//不同的主题包括不同的分区都是有着不同的offset
	//c.Input() <- &sarama.ProducerMessage{Topic: Topic, Key: sarama.StringEncoder(fmt.Sprintf("/topic/market/order-trade")), Value: sarama.StringEncoder("消息发送成功拉ssssssss！！！！" + strconv.Itoa(i))}
	c.Input() <- message
	select {
	//case msg := <-producer.Successes():
	//    log.Printf("Produced message successes: [%s]\n",msg.Value)
	case err := <-c.Errors():
		fmt.Println("Produced message failure: ", err)
	default:
		//fmt.Println("Produced message success",err)
	}
	return nil
}
